Kiểm tra và cài đặt Maven

Cài đặt bằng lệnh

sudo apt update
sudo apt install maven

Kiểm tra

mvn -version

Nếu cài đặt đúng thì sẽ trả về

Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_442, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "5.15.167.4-microsoft-standard-wsl2", arch: "amd64", family: "unix"

Pasted image 20250401113557.png

Tạo Project bằng Maven

Khởi tạo

mvn archetype:generate -DgroupId=com.example -DartifactId=SparkApp -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
  • Lưu ý: nếu bị lỗi có thể do thiếu quyền ở thư mục gì đó. Hãy thử tạo ở home
    Pasted image 20250401115534.png

sau đó sẽ được thư mục có cấu trúc như sau
Pasted image 20250401115610.png

Config

Thêm vào pom.xml

	<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    
    <dependency>
	    <groupId>org.apache.spark</groupId>
	    <artifactId>spark-core_2.12</artifactId>
	    <version>3.5.3</version>
	</dependency>

Pasted image 20250401120127.png

Config java

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

Full file

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>SparkApp</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>SparkApp</name>

  <!-- FIXME change it to the project's website -->

  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>

  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>

        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>

        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>

        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>

        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>

        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->

        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>

        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>

  

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>

        </plugin>

      </plugins>

    </pluginManagement>

  </build>

</project>

Word Count

Code và dữ liệu

xin chao cac ban xin chao Hadoop Bid Data minh xin tu gioi thieu minh len la Luu Vinh Tuong
package spark.main;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {
         SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Spark Word Count");

        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            String inputPath = "hdfs://vinhtuong-master:9000/input/input_1.txt";
            String outputPath = "hdfs://vinhtuong-master:9000/output/result";

            JavaRDD<String> textFile = sc.textFile(inputPath).cache();

            JavaPairRDD<String, Integer> wordCounts = textFile
                    .flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
                    .mapToPair(word -> new Tuple2<>(word.replaceAll("[^a-zA-Z]", "").toLowerCase(), 1))
                    .reduceByKey(Integer::sum);

            wordCounts.coalesce(1).saveAsTextFile(outputPath);

            System.out.println("Word Count completed. Output saved to " + outputPath);
        }
    }
}

Đóng gói

mvn clean package

Pasted image 20250401124029.png

Sau khi đóng gói, chúng ta sẽ được thư mục target như sau:
Pasted image 20250401124209.png

Chạy file

spark-submit --class spark.main.WordCount --master local[*] target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401125946.png

Check output

Pasted image 20250401130011.png
Pasted image 20250401130059.png

Spark SQL

Code

name,age,city
Alice,30,New York
Bob,25,Los Angeles
Charlie,35,Chicago
David,40,Houston
Emma,22,San Francisco
Frank,28,Seattle
Grace,33,Boston
Henry,27,Denver
package spark.main;

import org.apache.spark.sql.*;

public class SparkSQLExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Spark SQL Example")
                .master("local[*]")
                .getOrCreate();

            Dataset<Row> df = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("hdfs://vinhtuong-master:9000/input/people.csv");


        System.out.println("Du lieu ban dau:");
        df.show();

        System.out.println("Schema:");
        df.printSchema();

        System.out.println("Chon name va age:");
        df.select("name", "age").show();

        System.out.println("Loc nhung nguoi tren 25 tuoi:");
        df.filter("age > 25").show();

        System.out.println("Nhom theo so tuoi va dem so nguoi:");
        df.groupBy("age").count().show();

        df.write().mode("overwrite").csv("hdfs://vinhtuong-master:9000/output/people");


        spark.stop();
    }
}

Đóng gói

mvn clean package

Pasted image 20250401133149.png
Pasted image 20250401134218.png

Chạy file

spark-submit --class spark.main.SparkSQLExample --master local[*] target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401142737.png

Check output

  • Output bên hdfs sẽ không có gì. Vì những thao tác mình không lưu lại (check trên code) mà chỉ show ra màn hình
    Pasted image 20250401142806.png

  • Nên sẽ check file thực thi ở mục trên
    Pasted image 20250401143054.png
    Pasted image 20250401143106.png
    Pasted image 20250401143114.png
    Pasted image 20250401143122.png
    Pasted image 20250401143157.png
    Pasted image 20250401143215.png

Phân tích dữ liệu bán lẻ với Spark SQL

Chuẩn bị dữ liệu

Lấy dữ liệu tại đây.
Pasted image 20250401145826.png

Có tổng bao nhiêu giao dịch, sản phẩm và khách hàng khác nhau?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_1 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-1")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		// number of customer distinct
		// except 1 because there are value is null of information customer ID
		long cntCustomers = data.select("CustomerID").distinct().count() - 1; 
		
		// number of product distinct
		long cntProdcts = data.select("StockCode").distinct().count();
		
		// number of invoice distinct
		long cntInvoices = data.select("InvoiceNo").distinct().count();
		
		// print 
		System.out.println("Number of customer distinct: " + cntCustomers); 
		System.out.println("Number of product distinct: " + cntProdcts);
		System.out.println("Number of invoice distinct: " + cntInvoices);
		
	}
}

Đóng gói

mvn clean package

Pasted image 20250401151545.png

Chạy file

spark-submit --class spark.main.part_1 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401151511.png

Check output

  • Vì bài này không lưu vào hdfs mà chỉ hiển ra màn hình nên chỉ xem lúc chạy file
    Pasted image 20250401151337.png

Tỉ lệ khách hàng không có thông tin

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_2 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-2")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		// get number of customer (done part 1)
		long cntCustomers = data.select("CustomerID").count();
		
		// get number of customer no information
		long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
		
		double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
		System.out.printf("Ratio no information: %f \n", ratio);
	}
}

Đóng gói

mvn clean package

Pasted image 20250401152408.png

Chạy file

spark-submit --class spark.main.part_2 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401152335.png

Check output

Pasted image 20250401152201.png
Tỉ lệ khoảng 29,93 % khách hàng không có thông tin

Đâu là nước có số lượng đơn hàng (Quantity) nhiều thứ 3?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_3 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-2")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		data.createOrReplaceTempView("data");
		spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
	}
}

Đóng gói

mvn clean package

Pasted image 20250401152811.png

Chạy file

spark-submit --class spark.main.part_3 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401152858.png
Pasted image 20250401152908.png
Pasted image 20250401152917.png
Pasted image 20250401152926.png

Check output

Pasted image 20250401152944.png
output sắp xếp từ cao đến thấp => nhiều thứ 3 là EIRE với count = 142637

Từ nào xuất hiện ít nhất trong phần Description?

Code

package spark.main;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;

public class part_4 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Part-4")
                .master("local")
                .getOrCreate();
        
        Dataset<Row> data = spark.read()
                .option("inferSchema", true)
                .option("header", true)
                .csv("hdfs://vinhtuong-master:9000/input/retails.csv");
                
        data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;
            private int cnt = 0;
            
            @Override
            public Iterator<Row> call(Row r) throws Exception {
                List<String> listItem = Arrays.asList(r.getString(2).split(" "));
                
                List<Row> listItemRow = new ArrayList<Row>();
                for (String item : listItem) {
                    listItemRow.add(RowFactory.create(cnt, item, 1));
                    cnt++;
                }
                
                return listItemRow.iterator();
            }
        }, Encoders.row(new StructType()
                .add("number", "integer")
                .add("word", "string")
                .add("lit", "integer")))
        .createOrReplaceTempView("data");
        
        spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
    }
}

Đóng gói

mvn clean package

Pasted image 20250401154302.png

Chạy file

spark-submit --class spark.main.part_4 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401154525.png
Pasted image 20250401154544.png
Pasted image 20250401154556.png
Pasted image 20250401154604.png

Check output

Pasted image 20250401154633.png

Đổi code thành asc để hiển thị từ thấp đến lớn

spark.sql("select word, count(lit) as count from data group by word order by count asc").show();

Pasted image 20250401154953.png
Những từ trong bảng là top những xuất hiện ít nhất trong phần description

Sản phẩm nào bán được số lượng (Quantity) lớn nhất ở United Kingdom?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_5 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Part-4")
                .master("local")
                .getOrCreate();
        
        Dataset<Row> data = spark.read()
                .option("inferSchema", true)
                .option("header", true)
                .csv("hdfs://vinhtuong-master:9000/input/retails.csv");
                
        data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
		spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
    
        }
}

Đóng gói

mvn clean package

Pasted image 20250401155357.png

Chạy file

spark-submit --class spark.main.part_5 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401155419.png
Pasted image 20250401155429.png
Pasted image 20250401155439.png
Pasted image 20250401155454.png
Pasted image 20250401155507.png

Check output

Pasted image 20250401155529.png
check tên đầy đủ trong csv là WORLD WAR 2 GLIDERS ASSTD DESIGNS
với số lượng 48326


Spark Streaming - Dsteam (discretized steam)

Phiên bản 1: Xử lý theo từng batch

Lý thuyết

Dùng Spark để lấy dữ liệu streaming từ TCP socket (cổng được set là 9999)
Trên máy sẽ mở cổng 9999 để gõ vào

Mở cổng

Note: mở 1 terminal khác để chạy. Xem cái này như 1 cái server

nc -lk 9999

sau đó gõ vào những câu để count, với mỗi câu nhập vào được xem là 1 batch
Pasted image 20250402090121.png

Để kiểm tra xem đã mở được chưa

netstat -an | grep 9999

Nếu mở được sẽ có trạng thái là LISTEN

tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN 

Pasted image 20250402090352.png

Code

package spark.main;

import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;


public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // Set up Spark configuration and streaming context
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

        // Create a DStream that connects to a socket
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        System.out.println("Dang nhan du lieu tu socket...");
        
        lines.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Du lieu nhan duoc: ");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Process each RDD from the DStream
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
                                                           .reduceByKey((a, b) -> a + b);

        // Print the counts to the console
        wordCounts.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Word Count:");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Start the streaming computation
        jssc.start();
        System.out.println("Streaming Started");
        jssc.awaitTermination();

    }
}

Đóng gói

mvn clean package

Chạy file

spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
  • Lúc này chương trình sẽ chạy liên tục, cập nhật theo thời gian mình đã config ở dòng
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2)); //2 giây
  • Quay lại với terminal mở port 9999, gõ từng dòng vào

Kiểm tra output

  • Kiểm tra chương trình có chạy hay chưa, tìm "Dang nhan du lieu tu socket..."
    Pasted image 20250402091021.png
  • Kiểm tra streaming đang chạy, tìm "Streaming Started"
    Pasted image 20250402091513.png

Sau Khi kiểm tra có 2 dòng này thì chương trình đã chạy bình thường. Tiếp theo sẽ kiểm tra có nhận được dữ liệu từ cổng 9999 hay không và có word count được hay không

  • Kiểm tra có nhận được dữ liệu từ cổng hay không, tìm "Du lieu nhan duoc:"
    Pasted image 20250402091916.png
  • Kiểm tra word count được không, tìm "Word Count:"
    Pasted image 20250402092013.png

Phiên bản 2: Xử lý theo batch, tính dồn lại

Lý thuyết

Ý tưởng của phiên bản 1, nhưng kiểm tra có những từ gì xuất hiện trong toàn bộ quá trình streaming
Cập nhật thêm:

  • Dùng updateStateByKey() để giữ trạng thái cộng dồn số lần xuất hiện của từng từ.
  • Thêm checkpoint bằng jssc.checkpoint("checkpoint"); để Spark có thể duy trì trạng thái lâu dài.
  • Hàm updateFunction giúp Spark cập nhật tổng số lần xuất hiện từ trước đến hiện tại.

Code

package spark.main;

import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // Set up Spark configuration and streaming context
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

        jssc.checkpoint("checkpoint");

        // Create a DStream that connects to a socket
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        System.out.println("Dang nhan du lieu tu socket...");
        
        lines.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Du lieu nhan duoc: ");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Process each RDD from the DStream
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                                 .map(word -> word.toLowerCase());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
                                                           .reduceByKey(Integer::sum);

        Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (newValues, state) -> {
            int sum = state.orElse(0);
            for (int val : newValues) {
                sum += val;
            }
            return Optional.of(sum);
        };

        JavaPairDStream<String, Integer> cumulativeWordCounts = wordCounts.updateStateByKey(updateFunction);


        // Print the counts to the console
        cumulativeWordCounts.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Word Count:");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Start the streaming computation
        jssc.start();
        System.out.println("Streaming Started");
        jssc.awaitTermination();
    }
}

Đóng gói

mvn clean package

Chạy file

spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
  • Quay lại với terminal mở port 9999, gõ từng dòng vào

Kiểm tra output

  • Dữ liệu nhập vào
    Pasted image 20250402101418.png
  • Kiểm tra word count được không, tìm "Word Count:"
    Pasted image 20250402101435.png